package onion.mqtt.server.processor;

import io.netty.channel.Channel;
import io.netty.handler.codec.mqtt.*;
import onion.mqtt.server.MqttServerBuilder;
import onion.mqtt.server.event.IMqttServerMessageListener;
import onion.mqtt.server.manager.MessageManager;
import onion.mqtt.server.manager.SessionManager;
import onion.mqtt.server.manager.SubscribeManager;
import onion.mqtt.server.store.MessageStore;
import onion.mqtt.server.store.SessionStore;
import org.apache.commons.lang3.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.CompletableFuture;

/**
 * @author Mr, Lu
 * @developmentTeam 浙江允泽信息科技有限公司
 * @createTime 2023/12/12
 */
public class PublishProcessor extends AbstractMqttServerProcessor<MqttPublishMessage> {
    static final Logger log = LoggerFactory.getLogger(PublishProcessor.class);
    private final IMqttServerMessageListener messageEvent;

    public PublishProcessor(MqttServerBuilder serverBuilder) {
        this.messageEvent = serverBuilder.getMessageListener();
    }

    @Override
    public void process(Channel channel, MqttPublishMessage message) {
        // 如果消息为空，直接返回不处理
        if (ObjectUtils.isEmpty(message)) {
            return;
        }
        // 获取参数
        String clientId = getClientId(channel);
        MqttFixedHeader fixedHeader = message.fixedHeader();
        MqttQoS mqttQoS = fixedHeader.qosLevel();
        MqttPublishVariableHeader variableHeader = message.variableHeader();
        String topic = variableHeader.topicName();
        int packetId = variableHeader.packetId();
        log.debug("Publish - clientId: {}, topic: {}, mqttQos: {}, packetId: {}", clientId, topic, mqttQoS, packetId);

        // 处理回复逻辑
        switch (mqttQoS) {
            case AT_MOST_ONCE: // Qos 0
                break;
            case AT_LEAST_ONCE: // Qos1
                if (packetId != -1) {
                    MqttMessage messageAck = MqttMessageBuilders.pubAck()
                            .packetId(packetId)
                            .build();
                    channel.eventLoop().execute(() -> channel.writeAndFlush(messageAck));
                    log.debug("Publish - PubAck send clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topic, mqttQoS, packetId);
                }
                break;
            case EXACTLY_ONCE: // Qos2
                if (packetId != -1) {
                    MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREC, false, MqttQoS.AT_MOST_ONCE, false, 0);
                    MqttMessage mqttMessage = new MqttMessage(mqttFixedHeader, MqttMessageIdVariableHeader.from(packetId));
                    channel.eventLoop().execute(() -> channel.writeAndFlush(mqttMessage));
                    log.debug("Publish - PubRec send clientId:{} topicName:{} mqttQoS:{} packetId:{}", clientId, topic, mqttQoS, packetId);
                }
                break;
        }
        // 消息发布
        invokeListenerForPublish(channel, clientId, mqttQoS, topic, message);
    }

    /**
     * 调用侦听器进行发布
     *
     * @param channel
     * @param clientId
     * @param mqttQoS
     * @param topic
     * @param message
     */
    private void invokeListenerForPublish(Channel channel, String clientId, MqttQoS mqttQoS, String topic, MqttPublishMessage message) {
        log.debug("publish topic: {}", topic);
        MqttFixedHeader fixedHeader = message.fixedHeader();
        boolean isRetain = fixedHeader.isRetain();

        // 处理保留消息
        if (isRetain) {
            byte[] payload = new byte[message.payload().readableBytes()];
            if (MqttQoS.AT_MOST_ONCE == mqttQoS || payload.length == 0) {
                MessageManager.getInstance().removeRetainMessage(topic);
            } else {
                MessageStore retainMessage = new MessageStore();
                retainMessage.setClientId(clientId);
                retainMessage.setTopic(topic);
                retainMessage.setQoS(fixedHeader.qosLevel().value());
                retainMessage.setRetain(true);
                retainMessage.setTimestamp(System.currentTimeMillis());
                retainMessage.setPayload(payload);
                MessageManager.getInstance().addRetainMessage(retainMessage);
            }
        }

        // 消息发布
        SubscribeManager.getInstance().searchSubscribe(topic).forEach(subscribeStore -> {
            if (!clientId.equals(subscribeStore.getClientId())) {
                SessionStore session = SessionManager.getInstance().getSession(subscribeStore.getClientId());
                if (session != null) {
                    MqttPublishMessage msg = MqttMessageBuilders.publish()
                            .topicName(topic)
                            .qos(mqttQoS)
                            .payload(message.payload())
                            .retained(isRetain)
                            .messageId(new Long(System.currentTimeMillis()).intValue())
                            .build();
                    session.getChannel().writeAndFlush(msg);
                }
            }
        });

        // 消息订阅
        CompletableFuture.runAsync(() -> {
            try {
                if (messageEvent!= null) {
                    messageEvent.onMessage(channel, clientId, topic, mqttQoS, message);
                }
            } catch (Throwable e) {
                log.error("publish publishEvent fail clientId: {}.", clientId);
            }
        });

        // 消息流转
    }
}
